Python ジェネレータ - イテレータから協調型マルチタスクまで - 3
著者:Leonardo Giordani - 29/03/2013
はじめに
この記事では、ジェネレーターがどのように協調型マルチタスク・システムの基盤となるかを明らかにし、それを実装したコードをいくつか紹介します。この話題に入る前に、ジェネレータのもう一つの興味深い使い方、すなわちジェネレータ式の連鎖について簡単に説明します。
ジェネレータ式のチェーン化
PyCon 2008で、"Python Essential Reference "の著者であるDavid M. Beazley氏は、システム管理におけるジェネレータの使用について、非常に興味深いスピーチを行いました。
Davidは、一度に1つの要素を生成するジェネレータは連鎖可能であるという考察から始めています。つまり、あるジェネレータ式は別のジェネレータ式を包含することができるということです。このようにして彼は、データセットの「フィルター」として機能するコンポーネントを、非常にコンパクトで再利用可能な方法で書く方法を示しています。このようにして、「一つのことをうまくやる」ツールを構築し、必要な動作を得るために後からそれらを連鎖させるというUnixの哲学に従っています。
Microthread: 協調的マルチタスク
免責事項:ここで紹介したコンセプトとコードは、Kamaeliaプロジェクトから大きな影響を受けています。それはこちらでご覧いただけます。 ここでは、(Python)ジェネレータを使用して、協調型マルチタスクのコンセプトに基づいたアプリケーションを簡単に構築する方法を見ていきましょう。読者の皆さんは、先取りやスレッドベースのマルチタスクの概念に精通しており、そのようなソリューションの長所と短所を知っていると思います。
協調型マルチタスクでは、アプリケーションが任意の時間経過の間、CPUの制御を保持し、スケジューラーに自発的にリソースを解放するのを待つことができます。これは、アプリケーションとの事前の合意なしに、スケジューラーがアプリケーションの停止と再開を担当するという、現代のマルチタスクのアプローチとは大きく異なるものです。
アプリケーションが自発的に停止できるようになったため、共有データの保護、アトミック性、同期に関するすべての問題が解消されないまでも、大幅に簡素化されます。しかし、アプリケーションには、実行を停止して内部状態を保存し、後で同じ地点から再開するためのメカニズムが必要です。
実際、ジェネレータは yield ステートメントによってこの動作を実装しているため、協調的マルチタスクに基づくシステムを構築するために使用することができます。ここでは、プロセスをマイクロスレッドと呼び、スレッドの軽量な形態であることを強調しています。
マイクロスレッド
このようなシステムの簡単な実装を見てみましょう。まず、マイクロスレッド・オブジェクトが必要です。これは、他の類似したオブジェクトと同時に実行できるオブジェクトですが、協調的な方法で実行されます。
code: pytohn
class MicroThread(object):
def main(self):
while 1:
yield 1
このオブジェクトのインスタンスは、呼ばれるとジェネレータを返すmain()メソッドを公開しています。後者はnext()メソッドが呼ばれるたびに1を返し、同時にyield文の直後に実行が凍結され、まだ無限のwhileループの中にいます。
このオブジェクトは直接テストすることができます。
code: python
>> mt = MicroThread()
>> g = mt.main()
>> g
<generator object main at 0xb74331e4>
>> g.next()
1
オブジェクトをより簡単に継承・拡張できるようにするために、少しリファクタリングします。
code: python
class MicroThread(object):
def step(self):
pass
def create(self):
pass
def main(self):
self.create()
yield 1
while 1:
self.step()
yield 1
このような変更により、クラスを継承し、create()メソッドとstep()メソッドをオーバーライドするだけで、クラスを拡張することができるようになりました。前者はmain()が呼び出されると同時に呼び出され、遅延初期化の役割を果たし、後者はnext()が呼び出されるたびに、yieldでコードが凍結される直前に実行されます。注意していただきたいのは、create()がジェネレーター関数内で呼び出されるため、ジェネレーターが生成された後にnext()を一度呼び出して実行しなければならないということです。つまり、このオブジェクトを使った標準的なワークフローは次のようになります。
code: python
# Instance the object
mt = MicroThread()
# Create the generator
g = mt.main()
# Initialize it
g.next()
# Loop over it
g.next()
g.next()
...
main() は生成関数なので、他の生成関数と同じように動作し、StopIteration 例外を発生させて、その使用が終了したことを通知しなければなりません。オーバーライドされたstep() メソッドは、マイクロスレッドを終了させるために、任意の時点(複数の場合も含む)でこの例外を発生させることができます。
スケジューラ
次に、実行中のタスクを管理するシステム・コンポーネントであるスケジューラが必要です。本当のマルチタスクシステムでは、スケジューラは大きくて複雑なコンポーネントですが、協調的な環境ではむしろシンプルなものになります。タスクと次のタスクの間に、スケジューラは他の機能を実行することができますが、その基本的なワークフローは非常に単純です。当然ながら、スケジューラはタスクによって発生する可能性のあるStopIteration例外を処理し、実行中のマイクロスレッドのリストから削除します。
スケジューラのコアは以下のようなものになります。
code: python
for thread in active_microthreads:
try:
thread.next()
scheduled_microthreads.append(thread)
except StopIteration:
pass
active_microthreads = scheduled_microthreads
scheduled_microthreads = []
このスニペットは、上述の動作を網羅しています。現在のループで実行されるすべてのタスクを含む active_microthreads と、次のループで実行されるすべてのタスクを含む scheduled_microthreads という 2 つのリストがあります。スケジューラの各ループで、active_microthreads内のすべてのマイクロスレッドが実行されます。この後、スレッドは再びスケジューリングされ、schedule_microthreadsリストに追加されます。スレッドが実行中にStopIteration例外を発生させた場合は、単に再スケジューリングされません。active_microthreadsリストがなくなるとループが終了し、スケジュールされたスレッドのリストがactive_threadsリストに移され、その後ループが再び始まります。
したがって、スケジューラの最初の実装は次のようになります。
code: python
class Scheduler(object):
def __init__(self):
self.active_microthreads = []
self.scheduled_microthreads = []
def add_microthread(self, mthread):
g = mthread.main()
g.next()
self.active_microthreads.append(g)
def run(self):
while 1:
for thread in self.active_microthreads:
try:
thread.next()
self.scheduled_microthreads.append(thread)
except StopIteration:
pass
self.active_microthreads = self.scheduled_microthreads
self.scheduled_microthreads = []
__init__()メソッドは、上述した2つの内部リストを初期化します。add_microthread()メソッドでは、スケジューラにマイクロスレッドを追加することができます。このメソッドは、追加した各マイクロスレッド上で main() を呼び出してそのジェネレータを取得した後、後者の上で next() を一度呼び出して初期化し、最後にスケジュールされたタスクのリストに追加します。
スケジューラのロジックは run()メソッドに実装され、このメソッドは上記のコア・コードを無限の while ループで実行します。
以下の簡単なコードで、マイクロスレッドとスケジューラをテストできます。
code: python
import mthread
import scheduler
import time
class TestMicroThread(mthread.MicroThread):
def __init__(self, number):
self.num = number
def step(self):
print "Number:", self.num
time.sleep(1)
mt1 = TestMicroThread(1)
mt2 = TestMicroThread(2)
mt3 = TestMicroThread(3)
ms = scheduler.Scheduler()
ms.add_microthread(mt1)
ms.add_microthread(mt2)
ms.add_microthread(mt3)
ms.run()
ここでは、TestMicroThread はマイクロスレッドですが、step()メソッドは数値を表示して 1 秒待つように再実装されています。3 つのマイクロスレッドをインスタンス化してスケジューラに追加し、スケジューラの run() メソッドを実行します。当然のことながら、結果は次のようになります。
code: bash
$ python test_scheduler.py
Number: 1
Number: 2
Number: 3
Number: 1
Number: 2
Number: 3
協調型マルチタスクシステムで期待されるように、3 つのマイクロスレッドがラウンドロビン方式で実行されます。
注:この記事で示したすべてのマイクロスレッドは、コードをフリーズさせるために yield 1 を実行するだけですが、yield は return 文と同様に任意のオブジェクトを返すことができ、これを利用してマイクロスレッドとスケジューラ間の通信を強化することができます。
マイクロスケジューラ
スケジューラは、より柔軟性があり、マイクロスレッド自体に変換することができます。スケジューラは、実行されるとジェネレーターを返し、next()メソッドが呼び出されるたびに、そのマイクロスレッドの1つが実行されます。この後、スケジューラはフリーズして制御を戻します。
code: python
class MicroScheduler(object):
def __init__(self):
self.active_microthreads = []
self.scheduled_microthreads = []
def add_microthread(self, mthread):
g = mthread.main()
g.next()
self.active_microthreads.append(g)
def main(self):
yield 1
while 1:
if len(self.active_microthreads) == 0:
yield 1
for thread in self.active_microthreads:
try:
thread.next()
self.scheduled_microthreads.append(thread)
except StopIteration:
pass
yield 1
self.active_microthreads = self.scheduled_microthreads
self.scheduled_microthreads = []
任意のマイクロスレッド・インターフェースに合わせて、run() の名前を main()に変更し、いくつかのyield文を追加するだけで十分です。このスケジューラには create() メソッドがありませんが、ある場合はここで呼ばれるべきです。2 番目の降伏は、スケジューラにマイクロスレッドが含まれていない場合に呼び出され、何もすることがないからです。3 番目の yield は、マイクロスレッド実行部分の各ループの後に呼び出されます。
このような小さな変更により、スケジューラを別のスケジューラに実行することができ、複雑なシステムを簡単に構築するための階層を作ることができます。同時に、スケジューラは、forループの中で next() メソッドを呼び出すだけで、通常通り使用することができます。
code: python
import mthread
import mscheduler
import time
class TestMicroThread(mthread.MicroThread):
def __init__(self, number):
self.num = number
def step(self):
print "Number:", self.num
time.sleep(1)
mt1 = TestMicroThread(1)
mt2 = TestMicroThread(2)
mt3 = TestMicroThread(3)
ms = mscheduler.MicroScheduler()
ms.add_microthread(mt1)
ms.add_microthread(mt2)
ms.add_microthread(mt3)
for i in ms.main():
pass
この例は明らかに非常に単純です。しかし、協調システムのコンポーネントを構築し、それらを実行空間に共存させることがいかに簡単であるかを示しています。スケジューラを別の方法で実行すると、例えば別のジェネレータの中に新しいマイクロスレッドを追加することができます。これにより、入力されるサービス要求などの特定のニーズを管理するために、システムコンポーネントをオンザフライでインスタンス化することができます。
まとめ
明らかに、この種のマルチタスクは、デスクトップOSやWebサーバーで使用されているようなインタラクティブな実行を提供することはできませんが、人間のユーザーは自分の行動のフィードバックを即座に受け取る必要があります。しかし、タイミングを必要とせずにタスクを同時に実行しなければならないシステムでは、協調型マルチタスクは、そのシンプルさゆえに貴重なソリューションとなります。
もう1つの興味深いシナリオは、各タスクが小さな協調コンポーネントで構成されている実際のマルチタスクシステム(スレッドコードまたはOS自体によって支配されている)の場合です。各機能のコードを複数のプラグインに分割し、必要に応じてロードすることもできます。
ジェネレータベースのマイクロスレッドを使って協調的マルチタスクを実装したパッケージがKamaeliaで、この記事はそれに大いに触発されています。その他のソリューションとしては、現在最も利用されているマイクロスレッド対応の Python ライブラリであり、修正なしの標準的な Python インタープリタで動作する greenlet や、マイクロスレッドをネイティブに実装した Python のフォークである Stackless Python などが挙げられます。
Part 3 of the Python generators - from iterators to cooperative multitasking series
前回までの記事